import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statistics import mean

from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

import plotly.io as pio

# Use pio to ensure plotly plots render in Quarto
pio.renderers.default = 'notebook_connected'

Helper Functions

import plotly.graph_objects as go

def plot_3d_regression(model, model_name, data_type = 'linear'):

    # Make Standard Data
    X, y = make_regression(
    n_samples=300,     # Number of samples
    n_features=2,      # Number of features
    noise=15,          # Add noise to make it realistic
    random_state=42    # Set seed for reproducibility
)

    # Make standard non-linear data
    if data_type.lower() == 'non-linear':
        y = y + 500 * np.sin(X[:, 0]) + 250 * np.cos(X[:, 1])

    # Make non-standard data
    if data_type == 'ugly':
        # Add non-linear transformations
        y = y + 500 * np.sin(X[:, 0]) * np.cos(X[:, 1]) \
              + 300 * (X[:, 0] ** 2) \
              - 200 * np.exp(-X[:, 1] ** 2)

        # Add random feature interactions
        y = y + 100 * (X[:, 0] * X[:, 1])

        # Add random noise to make it "uglier"
        y = y + np.random.normal(0, 150, size=y.shape)

    model.fit(X, y)


    # Create a mesh grid for the features
    x_grid, y_grid = np.meshgrid(np.linspace(min(X[:, 0]), max(X[:, 0]), 100),
                                np.linspace(min(X[:, 1]), max(X[:, 1]), 100))
    grid = np.vstack((x_grid.flatten(), y_grid.flatten())).T 

    predictions = model.predict(grid)

    # Create 3D scatter plot for training data
    scatter = go.Scatter3d(
        x=X[:, 0], y=X[:, 1], z=y,
        mode='markers', marker=dict(color='blue', size=5), name='Training Data'
    )

    # Create 3D surface plot for the regression surface
    surface = go.Surface(
        x=x_grid, y=y_grid, z=predictions.reshape(x_grid.shape), opacity=0.5, colorscale='Viridis', name='Regression Surface'
    )

    # Combine both traces into one figure
    fig = go.Figure(data=[scatter, surface])

    # Update layout for better visualization
    fig.update_layout(
        title=f'Training Data and Regression Surface for {model_name}',
        scene=dict(
            xaxis_title='Feature 1',
            yaxis_title='Feature 2',
            zaxis_title='Target'
        )
    )

    # Show plot
    fig.show()

Linear Regression

OLS

class ols_regression():

    # Initialize the class
    def __init__(self):
        pass       
    
    def fit(self, X, y):
        '''Fit the regression to the X data via the OLS equation'''

        # Add a leading colums of 1s to the X data to account for the bias term
        X = np.hstack((np.ones((X.shape[0], 1)), X))

        # Train the data on (X.T @ X)^(-1) @ X.T @ y
        ols = np.linalg.inv(X.T.dot(X)).dot(X.T.dot(y))
        self.coef = ols[1:]
        self.bias = ols[0]

    def predict(self, X):
        '''Predict new data with the trained coefficients and bias'''

        # Check if the X data is 1D and reshape if needed
        if X.ndim == 1:
                    X = X.reshape(-1, 1) 

        # Make predictions by dotting the new data with the coefficients and adding the bias
        self.predictions = X.dot(self.coef) + self.bias
        
        return self.predictions
ols = ols_regression()

plot_3d_regression(ols, model_name='OLS', data_type='linear')
plot_3d_regression(ols, model_name='OLS', data_type='ugly')

Gradient Descent Regression

class GDRegression():
    def __init__(self, epochs, eta):
        '''Initialize the Gradient Descent Regression Class'''
        self.epochs = epochs
        self.eta = eta

    def fit(self, X, y, batch_size = "TBD"):
        '''Train the Gradient Descent Regression Class'''

        if batch_size == 'TBD':
            batch_size = X.shape[0]


        # Create random initialization for the bias and coefficients
        bias = np.random.random()
        coef = np.random.random(X.shape[1])

        # Iterate through each epoch
        for iter in range(self.epochs):
            
            indices = np.random.choice(X.shape[0], size=min(batch_size, len(y)), replace=False)
            X_batch = X[indices]
            y_batch = y[indices]

            # Make predictions for the X data being trained on
            y_hat = X_batch.dot(coef) + bias

            # Calculate the derrivative WRT bias and coef given the predicions
            derr_b = 2/X_batch.shape[0] * sum((y_hat - y_batch))
            derr_c = 2/X_batch.shape[0] * X_batch.T.dot(y_hat - y_batch)

            # Update the bias and the coef based on the derrivative
            bias = bias - derr_b * self.eta
            coef = coef - derr_c * self.eta

        # Finalize the bias and coef
        self.bias = bias
        self.coef = coef

    def predict(self, X):
        '''Predict new data given the learned bias and coef'''
        predictions = X.dot(self.coef) + self.bias
        return predictions

        
gd_reg = GDRegression(epochs=10000, eta=.01)
plot_3d_regression(gd_reg, 'Gradient Descent', data_type='linear')
plot_3d_regression(gd_reg, 'Gradient Descent', data_type='ugly')

KNN Regression

class KNNRegressor():
    def __init__(self, n_neighbors=5):
        '''Initialize the regressor with a defined number of nearest neighbors'''
        self.n_neighbors = n_neighbors

    def fit(self, X, y):
        '''Train the regressor by loading in all X and y data'''
        self.X = X
        self.y = y

    def predict(self, X):
        '''Make predictions based on the training data using euclidian distance'''
        predictions = np.empty(0)

        # For each test point...
        for test_point in X:
            # Calculate the distance between the test point and all training points
            distances = np.linalg.norm(self.X - test_point, axis=1)

            # Find the n_neighbors closest points
            closest_points_indices = np.argsort(distances)[:self.n_neighbors]

            # Use the mean of the closest points to formulate a predictions and append to the predictions array
            prediction = mean(self.y[closest_points_indices])
            predictions = np.append(predictions, prediction)

        return predictions
knn_regressor = KNNRegressor()

plot_3d_regression(knn_regressor, "K-Nearest Neighbors Regression", data_type='linear')
plot_3d_regression(knn_regressor, "K-Nearest Neighbors Regression", data_type='ugly')
class DecisionTreeRegressor():
    def __init__(self, max_depth=None):
        # Initializes the decision tree regressor with an optional maximum depth for the tree.
        self.max_depth = max_depth

    # Function for calculating the Mean Squared Error (MSE) of a split
    def mse(self, y):
        # MSE is calculated as the average of squared differences from the mean
        return np.mean((y - np.mean(y)) ** 2)
    
    # Function to find the best split at any given point (based on MSE)
    def _best_split(self, X, y):
        # Initialize variables to track the best split found
        self.best_mse = float('inf')  # Best MSE starts as infinity
        self.best_feature = None
        self.best_split_value = None
        self.best_left_y = None
        self.best_right_y = None

        # Loop over each feature in the dataset
        for feature_num in range(X.shape[1]):
            # Get unique values in the feature column to test potential splits
            feature_values = np.unique(X[:, feature_num])
            
            # For each unique value, try splitting the data
            for value in feature_values:
                # Find indices where the feature values are less than or equal to the split value
                left_index = X[:, feature_num] <= value
                right_index = X[:, feature_num] > value

                # Split the target values accordingly
                left_targets = y[left_index]
                right_targets = y[right_index]

                # Proceed only if both splits result in non-empty groups
                if len(left_targets) > 0 and len(right_targets) > 0:
                    # Compute MSE for both the left and right splits
                    left_mse = self.mse(left_targets)
                    right_mse = self.mse(right_targets)
                    
                    # Calculate the weighted average MSE of the two splits
                    total_average_mse = left_mse * len(left_targets)/len(y) + right_mse * len(right_targets)/len(y)

                    # If this split provides a better (lower) MSE, update the best split found
                    if total_average_mse < self.best_mse:
                        self.best_mse = total_average_mse
                        self.best_feature = feature_num
                        self.best_split_value = value
                        self.left_y = left_targets
                        self.right_y = right_targets

        # Return the best split information (feature index, value, and target splits)
        return self.best_split_value, self.best_feature, self.left_y, self.right_y
    
    # Function to recursively build the decision tree
    def _build_tree(self, X, y, depth=0):
        # Base case: If all targets are the same or max depth is reached, return the mean of the target values
        if len(np.unique(y)) == 1 or (self.max_depth is not None and depth >= self.max_depth):
            return np.mean(y)
        
        # Find the best split for the data
        best_split_value, best_feature, left_y, right_y = self._best_split(X, y)

        # If no valid split was found, return the mean of the targets
        if best_feature is None:
            return np.mean(y)
        
        # Split the data based on the best feature and split value
        left_index = X[:, best_feature] <= best_split_value
        right_index = X[:, best_feature] > best_split_value

        # Recursively build the left and right subtrees
        left_tree = self._build_tree(X[left_index], left_y, depth + 1)
        right_tree = self._build_tree(X[right_index], right_y, depth + 1)

        # Return the current node as a dictionary with the best split and its left and right subtrees
        return {
            'feature': best_feature,
            'value': best_split_value,
            'left': left_tree,
            'right': right_tree
        }
    
    # Function to make a prediction for a single sample using the tree
    def _single_prediction(self, tree, x):
        # If the current tree node is a dictionary (not a leaf), recursively traverse the tree
        if isinstance(tree, dict):
            if x[tree['feature']] < tree['value']:
                return self._single_prediction(tree['left'], x)  # Go left
            else:
                return self._single_prediction(tree['right'], x)  # Go right
        # If the current node is a leaf (not a dictionary), return the prediction (mean of targets)
        else:
            return tree
        
    # Function to predict target values for a set of samples
    def predict(self, X):
        # For each sample in X, make a prediction by recursively traversing the tree
        predictions = np.array([self._single_prediction(self.tree, x) for x in X])
        return predictions

    # Function to fit the decision tree to the training data
    def fit(self, X, y):
        # Build the tree by calling the recursive function with the training data
        self.tree = self._build_tree(X, y)
dt_reg = DecisionTreeRegressor()
plot_3d_regression(dt_reg, "Decision Tree", data_type='linear')
plot_3d_regression(dt_reg, "Decision Tree", data_type='ugly')

Random Forest Regression

class RandomForestRegressor():
    def __init__(self, n_estimators, max_depth=None):
        # Initializes the random forest regressor with the number of estimators (trees) and an optional maximum depth for each tree.
        self.n_estimators = n_estimators
        self.max_depth = max_depth

    # Function for calculating the Mean Squared Error (MSE) of a split
    def mse(self, y):
        # MSE is calculated as the average of squared differences from the mean
        return np.mean((y - np.mean(y)) ** 2)
    
    # Function to find the best split at any given point (based on MSE)
    def _best_split(self, X, y):
        # Initialize variables to track the best split found
        self.best_mse = float('inf')  # Best MSE starts as infinity
        self.best_feature = None
        self.best_split_value = None
        self.best_left_y = None
        self.best_right_y = None

        # Randomly sample 1/3 of the total features to consider for splitting
        n_features_to_consider = max(1, X.shape[1] // 3)
        random_feature_indices = np.random.choice(X.shape[1], size=n_features_to_consider, replace=False)

        # Only consider the randomly selected features for splitting
        feature_subset = X[:, random_feature_indices]

        # Loop through the randomly selected features and find the best split
        for i, feature_num in enumerate(random_feature_indices):  # Map back to original feature indices
            feature_values = np.unique(feature_subset[:, i])
            for value in feature_values:
                # Boolean indices based on the feature subset
                left_index = feature_subset[:, i] <= value
                right_index = feature_subset[:, i] > value

                # Subset targets using these indices
                left_targets = y[left_index]
                right_targets = y[right_index]

                # Ensure both sides have samples
                if len(left_targets) > 0 and len(right_targets) > 0:
                    # Calculate the MSE for both the left and right subsets
                    left_mse = self.mse(left_targets)
                    right_mse = self.mse(right_targets)
                    total_average_mse = left_mse * len(left_targets) / len(y) + right_mse * len(right_targets) / len(y)

                    # If this split results in a better (lower) MSE, update the best split
                    if total_average_mse < self.best_mse:
                        self.best_mse = total_average_mse
                        self.best_feature = feature_num
                        self.best_split_value = value
                        self.left_y = left_targets
                        self.right_y = right_targets

        # Return the best split information (feature index, value, and target splits)
        return self.best_split_value, self.best_feature, self.left_y, self.right_y

    
    # Function to recursively build the decision tree
    def _build_tree(self, X, y, depth=0):
        # Base case: If all targets are the same or max depth is reached, return the mean of the target values
        if len(np.unique(y)) == 1 or (self.max_depth is not None and depth >= self.max_depth):
            return np.mean(y)
        
        # Find the best split for the data
        best_split_value, best_feature, left_y, right_y = self._best_split(X, y)

        # If no valid split was found, return the mean of the targets
        if best_feature is None:
            return np.mean(y)
        
        # Split the data based on the best feature and split value
        left_index = X[:, best_feature] <= best_split_value
        right_index = X[:, best_feature] > best_split_value

        # Recursively build the left and right subtrees
        left_tree = self._build_tree(X[left_index], left_y, depth + 1)
        right_tree = self._build_tree(X[right_index], right_y, depth + 1)

        # Return the current node as a dictionary with the best split and its left and right subtrees
        return {
            'feature': best_feature,
            'value': best_split_value,
            'left': left_tree,
            'right': right_tree
        }

    # Function to make a prediction for a single sample using the tree
    def _single_prediction(self, tree, x):
        # If the current tree node is a dictionary (not a leaf), recursively traverse the tree
        if isinstance(tree, dict):
            if x[tree['feature']] < tree['value']:
                return self._single_prediction(tree['left'], x)  # Go left
            else:
                return self._single_prediction(tree['right'], x)  # Go right
        # If the current node is a leaf (not a dictionary), return the prediction (mean of targets)
        else:
            return tree
        
    # Function to predict target values for a set of samples
    def predict(self, X):
        predictions = []
        # For each tree in the random forest, make predictions for all samples in X
        for tree in self.trees:
            model_predictions = np.array([self._single_prediction(tree, x) for x in X])
            predictions.append(model_predictions)
        
        # Combine all the predictions of all the trees and average across the trees
        all_predictions = np.column_stack(predictions)
        all_predictions = np.mean(all_predictions, axis=1)

        return all_predictions
        

    # Function to train the random forest model
    def fit(self, X, y):
        models = []
        # Create n decision trees, each trained with bootstrapping (sampling with replacement)
        for n in range(self.n_estimators):
            random_row_indices = np.random.choice(len(X), len(X), replace=True)
            subset_X = X[random_row_indices]
            subset_y = y[random_row_indices]
            tree = self._build_tree(subset_X, subset_y)

            # Add each trained tree to the list of models
            models.append(tree)

        # Save all the trained trees
        self.trees = models
rf_regressor = RandomForestRegressor(10)
plot_3d_regression(rf_regressor, "Random Forest Regressor", "linear")
plot_3d_regression(rf_regressor, "Random Forest Regressor", "ugly")

Gradient Boosting Regression

class GradientBoostingRegressor:
    def __init__(self, n_estimators=100, learning_rate=0.1, max_depth=3):

        # Initialize model with given number of estimators, learning rate, and max depth for each tree
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth

    def fit(self, X, y):
        # Initialize predictions as the mean of the target values
        self.y_pred = np.full_like(y, np.mean(y), dtype=np.float32)
        self.trees = []

        for _ in range(self.n_estimators):
            # Calculate residuals
            residuals = y - self.y_pred

            # Fit a tree to the residuals
            tree = DecisionTreeRegressor(max_depth=self.max_depth)
            tree.fit(X, residuals)

            # Update predictions with the tree's output scaled by learning rate
            self.y_pred += self.learning_rate * tree.predict(X)
            self.trees.append(tree)

    def predict(self, X):
        # Start with initial predictions as the mean of target values
        y_pred = np.full(X.shape[0], np.mean(self.y_pred), dtype=np.float32)

        # Sum the residual predictions from all trees
        for tree in self.trees:
            y_pred += self.learning_rate * tree.predict(X)
        return y_pred
boost_regressor = GradientBoostingRegressor(n_estimators=50)
plot_3d_regression(boost_regressor, "Gradient Boost Regressor", "linear")
plot_3d_regression(boost_regressor, "Gradient Boost Regressor", "ugly")